package org.idea.mq.framework.rocketmq.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.idea.mq.framework.rocketmq.common.FastJsonSerializer;
import org.idea.mq.framework.rocketmq.common.MqMsgSerializer;

import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * @Author linhao
 * @Date created in 8:47 上午 2022/2/20
 */
public class ConsumerApplication {

    public static final String GROUP_NAME = "test-mq-topic-consumer";
    public static final String NAME_SERVER = "localhost:9876";
    public static final Integer CONSUME_TIME_OUT = 1000;
    public static MqMsgSerializer mqMsgSerializer = new FastJsonSerializer();

    public static void startConsumer(String topic, long sleepTime) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
        consumer.setNamesrvAddr(NAME_SERVER);
        consumer.setConsumerGroup(GROUP_NAME);
        consumer.setConsumeTimeout(CONSUME_TIME_OUT);
        consumer.setConsumeMessageBatchMaxSize(1);
        //只消费第一条队列的消息
//        consumer.setAllocateMessageQueueStrategy(new AlwaysFirstQueueAveragely());
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt messageExt : list) {
                    byte[] bytes = messageExt.getBody();
                    String result = mqMsgSerializer.unSerializer(bytes, String.class);
                    System.out.println(System.currentTimeMillis() + " result is :" + result);
                    try {
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.subscribe(topic, "*");
        consumer.start();
        System.out.println("启动了" + topic);
    }

    /**
     * 同一个进程里面只能有一个group name
     *
     * @param args
     * @throws MQClientException
     */
    public static void main(String[] args) throws MQClientException {
        startConsumer("test-mq-topic", 1000);
    }
}
